package com.young.rocketmq.simpleexample.producer;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 事务消息-生产者
 * 事务消息确保本地事务的执行和消息的发送者以原子方式执行
 * <p>
 * transactionCheckMax 设置消息的检查次数限制
 * <p>
 * 如果一条消息经过transactionCheckMax次检查，则broker
 * 将丢弃此消息并默认同时打印错误日志，开发人员可以覆盖
 * “AbstractTransactionCheckListener”来更改此行为
 * <p>
 * transactionTimeout 设置一定时间后进行消息检查，发送消息时候
 * 可以自定义设置
 * <p>
 * 事务消息可能被检查或消费不止一次
 * <p>
 * 事务消息的三种状态：
 * TransactionStatus.CommitTransaction 事务提交，标识允许消费者消费
 * TransactionStatus.RollbackTransaction 事务回滚，标识消息被删除，不能被消费者消费
 * TransactionStatus.Unknown 中间状态，意味着生产者本地事务方法执行完成后，不能确认是否
 * commit，所以需要进行生产者的check方法，来确定消息最终的状态
 *
 * @author ：<a href="mailto:youngkun2016@163.com">young</a>
 * @date ：Created in 2020/6/28
 */
public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    /**
     * 本地事务执行
     *
     * @param msg
     * @param arg
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.UNKNOW;
    }

    /**
     * 本地事务执行结果检查
     *
     * @param msg
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        Integer status = localTrans.get(msg.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}
